package ltd.nullpointer.tcp.core;

import com.boot2.core.HlAssert;
import com.boot2.core.expiringmap.ExpiringMap;
import com.boot2.core.utils.ReflectUtil;
import com.boot2.core.utils.StringUtils;
import io.netty.channel.Channel;
import lombok.extern.apachecommons.CommonsLog;
import ltd.nullpointer.tcp.core.annotation.DeviceSn;
import ltd.nullpointer.tcp.core.annotation.TcpDownMessage;
import ltd.nullpointer.tcp.core.annotation.TcpUpMessage;
import ltd.nullpointer.tcp.core.message.TcpMessageAdaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

/**
 * @author zhangweilin
 * @Description: TCP发送
 * @date 2019/12/20
 */
//@Service
@CommonsLog
public abstract class DefaultTcpMessageProducer implements TCPMessageProducer {

    @Autowired
    SessionManager sessionManager;

    /**
     * 给下发的报文缓存定时器，以用于下发后关闭电源用，且如果未收到回复，则超时关闭，此缓存的有效期即为超时时间
     * key为sensorNo_回复命令字，如  28xxxxxx_20
     */
//    Cache<String, Integer> cache = null;
    ExpiringMap<String, Integer> map = null;

    @PostConstruct
    public void init() {
//        cache = Caffeine.newBuilder()
//                .expireAfterWrite(15, TimeUnit.SECONDS)
//                .removalListener((e1, e2, e3) -> {
////                    System.out.println("key:" + e1 + ",value:" + e2 + ",删除原因:" + e3);
//                    String key = e1.toString();
//                    String[] keyArr = key.split("_");
//                    String sensorNo = keyArr[0];
//
//                    log.debug("关闭电源,key: " + key + ",关闭原因: " + e3);
//                    Channel channel = sessionManager.getChannelByDeviceId(sensorNo);
//                    turnOff(channel);
//                })
//                .build();

        map = ExpiringMap.builder()
                .expiration(10, TimeUnit.SECONDS)
                .variableExpiration()
                .expirationListener((k, v) -> {
//            System.out.println("过期,k = " + k + " , v: " + v);
                    String key = k.toString();
                    String[] keyArr = key.split("_");
                    String sensorNo = keyArr[0];

                    log.debug("下发关闭电源,key: " + key + ",v: " + v);
                    Channel channel = sessionManager.getChannelByDeviceId(sensorNo);
                    turnOff(channel);
                }).build();
    }

    /**
     * 获取在线通道
     *
     * @param deviceId
     * @return
     */
    private Channel getChannelByDeviceId(String deviceId) {
        Channel channel = sessionManager.getChannelByDeviceId(deviceId);
        HlAssert.notNull(channel, "该设备不在线，请求无法下达到设备,deviceId: "+deviceId);
        return channel;
    }

    /**
     * 发送给设备的实体
     *
     * @param deviceId
     * @param object
     */
    @Override
    @Async
    public void sendToDevice(String deviceId, Object object) {
        Channel channel = getChannelByDeviceId(deviceId);
        //上电及等待
        log.debug("打开电源,sensorNo: " + deviceId);
        turnOn(channel, object);

        log.debug("下发命令");
        //下发实际报文
        TcpMessageAdaptor<?> nPiotTCPMessage = new TcpMessageAdaptor<>(object);
        channel.writeAndFlush(nPiotTCPMessage);

        //将下发的报文加上定时器
        TcpDownMessage tcpDownMessage = object.getClass().getAnnotation(TcpDownMessage.class);
        if (null != tcpDownMessage) {
            String replyType = tcpDownMessage.replyType();
            //不用判断空，如果为空，表示不用理会回复，或没有回复，如果是没有回复的，下发后，更要在超时时间内关闭
            String key = deviceId + "_" + replyType;
            log.debug("此下发有回复，启动超时断电，key: " + key);
//            cache.put(key, 1);
            map.put(key, 1);
        }
    }

    @Override
    public void handReply(Object object) {
        TcpUpMessage tcpUpMessage = object.getClass().getAnnotation(TcpUpMessage.class);
        if (null != tcpUpMessage) {
            String[] messageCode = tcpUpMessage.messageCode();
            if (null != messageCode) {
                for (String messageCode0 : messageCode) {
                    String sensorNo = ReflectUtil.getFieldValue(object, DeviceSn.class);
                    //将缓存失效，会触发过期事件，会触发下发断电报文
//                    log.debug("处理上报回复,sensorNo: " + sensorNo + ", messageCode: " + ArrayUtils.toString(messageCode0));
                    if (StringUtils.isNotEmpty(sensorNo)) {
                        String key = sensorNo + "_" + messageCode0;
                        log.debug("处理上报回复,key: " + key);
//                    cache.invalidate(key);
                        map.setExpiration(key, 0, TimeUnit.MILLISECONDS);
                    }
                }
            }
        }
    }

    /**
     * 关闭电源
     *
     * @param sensorNo
     */
    public void turnOff(Channel channel) {

    }

    /**
     * 打开电源
     *
     * @param channel
     */
    public void turnOn(Channel channel, Object object) {

    }


//    private void turnOn(String deviceId) {
//        Channel channel = getChannelByDeviceId(deviceId);
//
//        TurnOn turnOn=new
//        TcpMessageAdaptor<?> nPiotTCPMessage = new TcpMessageAdaptor<>(object);
//        channel.writeAndFlush(nPiotTCPMessage);
//    }
}
